From 4db4e0fa242ec8cf6da9e921cc309cdbd5cb2140 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Fri, 28 Sep 2018 10:48:49 +0200 Subject: [PATCH] Test with buffer --- src/siri/db/buffer.c | 104 +++++++++++++++++++++++++++++-------------- 1 file changed, 70 insertions(+), 34 deletions(-) diff --git a/src/siri/db/buffer.c b/src/siri/db/buffer.c index 5aac01df..b6a0ab80 100644 --- a/src/siri/db/buffer.c +++ b/src/siri/db/buffer.c @@ -26,10 +26,14 @@ /* when set to 1, no caching is done. 1 is the minimum value. */ #define SIRIDB_BUFFER_CACHE 64 -static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series); -static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series); +static int buffer__create_new(siridb_t * siridb, siridb_series_t * series); +static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series); +static int buffer__write_start(siridb_t * siridb, siridb_series_t * series); +static void buffer__migrate_to_new(char * pt); -static const uint64_t BUFFER_end = 0xffffffffffffffff; +/* buffer__start cannot conflict with a series_id since id 0 is never used */ +static const uint32_t buffer__start = 0x00000000; +static const uint64_t buffer__end = 0xffffffffffffffff; /* @@ -46,7 +50,7 @@ int siridb_buffer_write_empty( SEEK_SET) || /* write end ts */ - fwrite( &BUFFER_end, + fwrite( &buffer__end, sizeof(uint64_t), 1, siridb->buffer_fp) != 1) ? EOF : 0; @@ -72,10 +76,7 @@ int siridb_buffer_write_last_point( memcpy(buf, &point->ts, sizeof(uint64_t)); memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t)); - memcpy( - buf + sizeof(uint64_t) + sizeof(qp_via_t), - &BUFFER_end, - sizeof(uint64_t)); + memcpy(buf + 16, &buffer__end, sizeof(uint64_t)); return ( /* jump to position where to write the new point */ @@ -100,8 +101,8 @@ int siridb_buffer_new_series(siridb_t * siridb, siridb_series_t * series) } return (siridb->empty_buffers->len) ? - BUFFER_use_empty(siridb, series) : - BUFFER_create_new(siridb, series); + buffer__use_empty(siridb, series) : + buffer__create_new(siridb, series); } int siridb_buffer_fsync(siridb_t * siridb) @@ -126,6 +127,24 @@ int siridb_buffer_open(siridb_t * siridb) return 0; } +static void buffer__migrate_to_new(char * pt) +{ + char * npt = pt; + uint32_t series_id = *((uint32_t *) pt); + pt += sizeof(uint32_t); + size_t num = *((size_t *) pt); + pt += sizeof(size_t); + + memcpy(npt, &buffer__start, sizeof(uint32_t)); + npt += sizeof(uint32_t); + memcpy(npt, &series_id, sizeof(uint32_t)); + npt += sizeof(uint32_t); + + memmove(npt, pt, num * 16); + npt += num * 16; + memcpy(npt, &buffer__end, sizeof(uint64_t)); +} + /* * Returns 0 if successful or -1 in case of an error. * (signal might be raised) @@ -135,11 +154,14 @@ int siridb_buffer_load(siridb_t * siridb) FILE * fp; FILE * fp_temp; size_t read_at_once = 8; - size_t num, i, j; + size_t num, i; char buffer[siridb->buffer_size * read_at_once]; char * pt; long int offset = 0; siridb_series_t * series; + _Bool log_migrate = 1; + uint32_t buf_start, series_id; + uint64_t * ts; log_info("Loading and cleanup buffer"); @@ -182,11 +204,24 @@ int siridb_buffer_load(siridb_t * siridb) { for (i = 0; i < num; i++) { - pt = buffer + i * siridb->buffer_size; - series = (siridb_series_t *) - imap_get(siridb->series_map, *((uint32_t *) pt)); + buf_start = *((uint32_t *) pt); + if (buf_start != buffer__start) + { + if (log_migrate) + { + log_warning("Buffer will be migrated"); + log_migrate = 0; + } + buffer__migrate_to_new(pt); + } + + pt += sizeof(uint32_t); + series_id = *((uint32_t *) pt); + pt += sizeof(uint32_t); + + series = imap_get(siridb->series_map, series_id); if (series == NULL) { @@ -206,16 +241,10 @@ int siridb_buffer_load(siridb_t * siridb) series->bf_offset = offset; - pt += sizeof(uint32_t); - - for ( j = (size_t) *pt, pt += sizeof(size_t); - j--; - pt += 16) + for (; *(ts = (uint64_t *) pt) != buffer__end; pt += 16) { - siridb_points_add_point( - series->buffer, - (uint64_t *) pt, - (qp_via_t *) (pt + 8)); + qp_via_t * val = (qp_via_t *) (pt + 8); + siridb_points_add_point(series->buffer, ts, val); } offset += siridb->buffer_size; @@ -248,6 +277,19 @@ int siridb_buffer_load(siridb_t * siridb) return 0; } +static int buffer__write_start(siridb_t * siridb, siridb_series_t * series) +{ + const size_t sz = sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t); + char buf[sz]; + + memcpy(buf, &buffer__start, sizeof(uint32_t)); + memcpy(buf + sizeof(uint32_t), &series->id, sizeof(uint32_t)); + memcpy(buf + sizeof(uint64_t), &buffer__end, sizeof(uint64_t)); + + /* write series ID and 0 length to buffer */ + return (fwrite(buf, sz, 1, siridb->buffer_fp) == 1) ? 0 : -1; +} + /* * Reserve a space in the buffer for a new series. The position of this space * in the buffer is read from siridb->empty_buffers so this list must have @@ -258,11 +300,8 @@ int siridb_buffer_load(siridb_t * siridb) * Note that an available spot must be checked before calling this function. * This functions has undefined behavior if no spot is found. */ -static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series) +static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series) { - const size_t sz = sizeof(uint32_t) + sizeof(size_t); - char buf[sz]; - series->bf_offset = (long int) slist_pop(siridb->empty_buffers); /* jump to the correct buffer position */ @@ -272,11 +311,8 @@ static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series) return -1; } - memcpy(buf, &series->id, sizeof(uint32_t)); - memcpy(buf + sizeof(uint32_t), &series->buffer->len, sizeof(size_t)); - /* write series ID and 0 length to buffer */ - if (fwrite(buf, sz, 1, siridb->buffer_fp) != 1) + if (buffer__write_start(siridb, series)) { ERR_FILE return -1; @@ -292,7 +328,7 @@ static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series) * * Returns 0 if successful or -1 and a signal is raised in case of an error. */ -static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series) +static int buffer__create_new(siridb_t * siridb, siridb_series_t * series) { long int buffer_pos; /* get file descriptor */ @@ -318,8 +354,8 @@ static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series) return -1; } - /* write series ID to buffer */ - if (fwrite(&series->id, sizeof(uint32_t), 1, siridb->buffer_fp) != 1) + /* write buffer start and series ID to buffer */ + if (buffer__write_start(siridb, series)) { ERR_FILE return -1; -- 2.30.2